home *** CD-ROM | disk | FTP | other *** search
- # -*- Mode: Python; tab-width: 4 -*-
- # $Id: filesys.py,v 1.5 2000/06/02 14:22:48 brian Exp $
- # Author: Sam Rushing <rushing@nightmare.com>
- #
- # Generic filesystem interface.
- #
-
- # We want to provide a complete wrapper around any and all
- # filesystem operations.
-
- # this class is really just for documentation,
- # identifying the API for a filesystem object.
-
- # opening files for reading, and listing directories, should
- # return a producer.
-
- class abstract_filesystem:
- def __init__ (self):
- pass
-
- def current_directory (self):
- "Return a string representing the current directory."
- pass
-
- def listdir (self, path, long=0):
- """Return a listing of the directory at 'path' The empty string
- indicates the current directory. If 'long' is set, instead
- return a list of (name, stat_info) tuples
- """
- pass
-
- def open (self, path, mode):
- "Return an open file object"
- pass
-
- def stat (self, path):
- "Return the equivalent of os.stat() on the given path."
- pass
-
- def isdir (self, path):
- "Does the path represent a directory?"
- pass
-
- def isfile (self, path):
- "Does the path represent a plain file?"
- pass
-
- def cwd (self, path):
- "Change the working directory."
- pass
-
- def cdup (self):
- "Change to the parent of the current directory."
- pass
-
-
- def longify (self, path):
- """Return a 'long' representation of the filename
- [for the output of the LIST command]"""
- pass
-
- # standard wrapper around a unix-like filesystem, with a 'false root'
- # capability.
-
- # security considerations: can symbolic links be used to 'escape' the
- # root? should we allow it? if not, then we could scan the
- # filesystem on startup, but that would not help if they were added
- # later. We will probably need to check for symlinks in the cwd method.
-
- # what to do if wd is an invalid directory?
-
- import os
- import stat
-
- import string
-
- def safe_stat (path):
- try:
- return (path, os.stat (path))
- except:
- return None
-
- import regex
- import regsub
- import glob
-
- class os_filesystem:
- path_module = os.path
-
- # set this to zero if you want to disable pathname globbing.
- # [we currently don't glob, anyway]
- do_globbing = 1
-
- def __init__ (self, root, wd='/'):
- self.root = root
- self.wd = wd
-
- def current_directory (self):
- return self.wd
-
- def isfile (self, path):
- p = self.normalize (self.path_module.join (self.wd, path))
- return self.path_module.isfile (self.translate(p))
-
- def isdir (self, path):
- p = self.normalize (self.path_module.join (self.wd, path))
- return self.path_module.isdir (self.translate(p))
-
- def cwd (self, path):
- p = self.normalize (self.path_module.join (self.wd, path))
- translated_path = self.translate(p)
- if not self.path_module.isdir (translated_path):
- return 0
- else:
- old_dir = os.getcwd()
- # temporarily change to that directory, in order
- # to see if we have permission to do so.
- try:
- can = 0
- try:
- os.chdir (translated_path)
- can = 1
- self.wd = p
- except:
- pass
- finally:
- if can:
- os.chdir (old_dir)
- return can
-
- def cdup (self):
- return self.cwd ('..')
-
- def listdir (self, path, long=0):
- p = self.translate (path)
- # I think we should glob, but limit it to the current
- # directory only.
- ld = os.listdir (p)
- if not long:
- return list_producer (ld, 0, None)
- else:
- old_dir = os.getcwd()
- try:
- os.chdir (p)
- # if os.stat fails we ignore that file.
- result = filter (None, map (safe_stat, ld))
- finally:
- os.chdir (old_dir)
- return list_producer (result, 1, self.longify)
-
- # TODO: implement a cache w/timeout for stat()
- def stat (self, path):
- p = self.translate (path)
- return os.stat (p)
-
- def open (self, path, mode):
- p = self.translate (path)
- return open (p, mode)
-
- def unlink (self, path):
- p = self.translate (path)
- return os.unlink (p)
-
- def mkdir (self, path):
- p = self.translate (path)
- return os.mkdir (p)
-
- def rmdir (self, path):
- p = self.translate (path)
- return os.rmdir (p)
-
- # utility methods
- def normalize (self, path):
- # watch for the ever-sneaky '/+' path element
- path = regsub.gsub ('/+', '/', path)
- p = self.path_module.normpath (path)
- # remove 'dangling' cdup's.
- if len(p) > 2 and p[:3] == '/..':
- p = '/'
- return p
-
- def translate (self, path):
- # we need to join together three separate
- # path components, and do it safely.
- # <real_root>/<current_directory>/<path>
- # use the operating system's path separator.
- path = string.join (string.split (path, '/'), os.sep)
- p = self.normalize (self.path_module.join (self.wd, path))
- p = self.normalize (self.path_module.join (self.root, p[1:]))
- return p
-
- def longify (self, (path, stat_info)):
- return unix_longify (path, stat_info)
-
- def __repr__ (self):
- return '<unix-style fs root:%s wd:%s>' % (
- self.root,
- self.wd
- )
-
- if os.name == 'posix':
-
- class unix_filesystem (os_filesystem):
- pass
-
- class schizophrenic_unix_filesystem (os_filesystem):
- PROCESS_UID = os.getuid()
- PROCESS_EUID = os.geteuid()
- PROCESS_GID = os.getgid()
- PROCESS_EGID = os.getegid()
-
- def __init__ (self, root, wd='/', persona=(None, None)):
- os_filesystem.__init__ (self, root, wd)
- self.persona = persona
-
- def become_persona (self):
- if self.persona is not (None, None):
- uid, gid = self.persona
- # the order of these is important!
- os.setegid (gid)
- os.seteuid (uid)
-
- def become_nobody (self):
- if self.persona is not (None, None):
- os.seteuid (self.PROCESS_UID)
- os.setegid (self.PROCESS_GID)
-
- # cwd, cdup, open, listdir
- def cwd (self, path):
- try:
- self.become_persona()
- return os_filesystem.cwd (self, path)
- finally:
- self.become_nobody()
-
- def cdup (self, path):
- try:
- self.become_persona()
- return os_filesystem.cdup (self)
- finally:
- self.become_nobody()
-
- def open (self, filename, mode):
- try:
- self.become_persona()
- return os_filesystem.open (self, filename, mode)
- finally:
- self.become_nobody()
-
- def listdir (self, path, long=0):
- try:
- self.become_persona()
- return os_filesystem.listdir (self, path, long)
- finally:
- self.become_nobody()
-
- # This hasn't been very reliable across different platforms.
- # maybe think about a separate 'directory server'.
- #
- # import posixpath
- # import fcntl
- # import FCNTL
- # import select
- # import asyncore
- #
- # # pipes /bin/ls for directory listings.
- # class unix_filesystem (os_filesystem):
- # pass
- # path_module = posixpath
- #
- # def listdir (self, path, long=0):
- # p = self.translate (path)
- # if not long:
- # return list_producer (os.listdir (p), 0, None)
- # else:
- # command = '/bin/ls -l %s' % p
- # print 'opening pipe to "%s"' % command
- # fd = os.popen (command, 'rt')
- # return pipe_channel (fd)
- #
- # # this is both a dispatcher, _and_ a producer
- # class pipe_channel (asyncore.file_dispatcher):
- # buffer_size = 4096
- #
- # def __init__ (self, fd):
- # asyncore.file_dispatcher.__init__ (self, fd)
- # self.fd = fd
- # self.done = 0
- # self.data = ''
- #
- # def handle_read (self):
- # if len (self.data) < self.buffer_size:
- # self.data = self.data + self.fd.read (self.buffer_size)
- # #print '%s.handle_read() => len(self.data) == %d' % (self, len(self.data))
- #
- # def handle_expt (self):
- # #print '%s.handle_expt()' % self
- # self.done = 1
- #
- # def ready (self):
- # #print '%s.ready() => %d' % (self, len(self.data))
- # return ((len (self.data) > 0) or self.done)
- #
- # def more (self):
- # if self.data:
- # r = self.data
- # self.data = ''
- # elif self.done:
- # self.close()
- # self.downstream.finished()
- # r = ''
- # else:
- # r = None
- # #print '%s.more() => %s' % (self, (r and len(r)))
- # return r
-
- # For the 'real' root, we could obtain a list of drives, and then
- # use that. Doesn't win32 provide such a 'real' filesystem?
- # [yes, I think something like this "\\.\c\windows"]
-
- class msdos_filesystem (os_filesystem):
- def longify (self, (path, stat_info)):
- return msdos_longify (path, stat_info)
-
- # A merged filesystem will let you plug other filesystems together.
- # We really need the equivalent of a 'mount' capability - this seems
- # to be the most general idea. So you'd use a 'mount' method to place
- # another filesystem somewhere in the hierarchy.
-
- # Note: this is most likely how I will handle ~user directories
- # with the http server.
-
- class merged_filesystem:
- def __init__ (self, *fsys):
- pass
-
- # this matches the output of NT's ftp server (when in
- # MSDOS mode) exactly.
-
- def msdos_longify (file, stat_info):
- if stat.S_ISDIR (stat_info[stat.ST_MODE]):
- dir = '<DIR>'
- else:
- dir = ' '
- date = msdos_date (stat_info[stat.ST_MTIME])
- return '%s %s %8d %s' % (
- date,
- dir,
- stat_info[stat.ST_SIZE],
- file
- )
-
- def msdos_date (t):
- try:
- info = time.gmtime (t)
- except:
- info = time.gmtime (0)
- # year, month, day, hour, minute, second, ...
- if info[3] > 11:
- merid = 'PM'
- info[3] = info[3] - 12
- else:
- merid = 'AM'
- return '%02d-%02d-%02d %02d:%02d%s' % (
- info[1],
- info[2],
- info[0]%100,
- info[3],
- info[4],
- merid
- )
-
- months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
- 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
-
- mode_table = {
- '0':'---',
- '1':'--x',
- '2':'-w-',
- '3':'-wx',
- '4':'r--',
- '5':'r-x',
- '6':'rw-',
- '7':'rwx'
- }
-
- import time
-
- def unix_longify (file, stat_info):
- # for now, only pay attention to the lower bits
- mode = ('%o' % stat_info[stat.ST_MODE])[-3:]
- mode = string.join (map (lambda x: mode_table[x], mode), '')
- if stat.S_ISDIR (stat_info[stat.ST_MODE]):
- dirchar = 'd'
- else:
- dirchar = '-'
- date = ls_date (long(time.time()), stat_info[stat.ST_MTIME])
- return '%s%s %3d %-8s %-8s %8d %s %s' % (
- dirchar,
- mode,
- stat_info[stat.ST_NLINK],
- stat_info[stat.ST_UID],
- stat_info[stat.ST_GID],
- stat_info[stat.ST_SIZE],
- date,
- file
- )
-
- # Emulate the unix 'ls' command's date field.
- # it has two formats - if the date is more than 180
- # days in the past, then it's like this:
- # Oct 19 1995
- # otherwise, it looks like this:
- # Oct 19 17:33
-
- def ls_date (now, t):
- try:
- info = time.gmtime (t)
- except:
- info = time.gmtime (0)
- # 15,600,000 == 86,400 * 180
- if (now - t) > 15600000:
- return '%s %2d %d' % (
- months[info[1]-1],
- info[2],
- info[0]
- )
- else:
- return '%s %2d %02d:%02d' % (
- months[info[1]-1],
- info[2],
- info[3],
- info[4]
- )
-
- # ===========================================================================
- # Producers
- # ===========================================================================
-
- class list_producer:
- def __init__ (self, file_list, long, longify):
- self.file_list = file_list
- self.long = long
- self.longify = longify
- self.done = 0
-
- def ready (self):
- if len(self.file_list):
- return 1
- else:
- if not self.done:
- self.done = 1
- return 0
- return (len(self.file_list) > 0)
-
- # this should do a pushd/popd
- def more (self):
- if not self.file_list:
- return ''
- else:
- # do a few at a time
- bunch = self.file_list[:50]
- if self.long:
- bunch = map (self.longify, bunch)
- self.file_list = self.file_list[50:]
- return string.joinfields (bunch, '\r\n') + '\r\n'
-
-